package com.example.dobs.demo.flink.realtime.report.study.flink;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import com.example.dobs.demo.flink.realtime.report.study.flink.bean.Event;

public class RealtimeReportProcessFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, String>> {
    private volatile transient ValueState<String> requestState;
    private volatile transient ListState<String> impressionState;
    private static int timeToLiveMinutes;

    @Override
    public void open(Configuration config) throws Exception {
//
//        Configuration parameters = (Configuration)
//                getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//        timeToLiveMinutes = parameters.getInteger("time-to-live-minutes", 30);
//        timeToLiveMinutes = 30;
        timeToLiveMinutes = config.getInteger("time-to-live-minutes", 30);
        ValueStateDescriptor<String> requestStateDescriptor = new ValueStateDescriptor<>("request state descriptor", String.class);
        ListStateDescriptor<String> showClickStateDescriptor = new ListStateDescriptor<>("show click state descriptor", String.class);

        impressionState = getRuntimeContext().getListState(showClickStateDescriptor);
        requestState = getRuntimeContext().getState(requestStateDescriptor);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
        //没有匹配到
        String request = requestState.value();
        if (request != null){
            Event requestEvent = new Event().readByLine(request);
            requestEvent.price = "0";//无曝光，价格记为0
            out.collect(new Tuple2<>("no-join", requestEvent.toString()));
        }
        requestState.clear();
        impressionState.clear();
    }

    @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {

        String eventTag = value.f0;

        //debug pass
//        System.out.println(value.f1);
        if (eventTag.equals("request")) {
            String request = value.f1;
            //debug pass
//            System.out.println(value.f1);

//            long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
            requestState.update(request);
            Iterable<String> impressionIter = this.impressionState.get();//拿曝光
            Event requestEvent = new Event().readByLine(request);
            if (impressionIter != null) {
                for (String impression : impressionIter) {
                    //debug not pass
//                    System.out.println(impression);
                    Event event = paddingData(requestEvent, impression);
                    out.collect(new Tuple2<>("joined", event.toString()));
                }
            }
            ctx.timerService().registerEventTimeTimer(Long.parseLong(requestEvent.timestamp) + timeToLiveMinutes * 60 * 1000);//定时器

            impressionState.clear();
        }

        if (eventTag.equals("impression")) {
            String impression = value.f1;
            //debug not pass
//            System.out.println(value.f1);

            String request = requestState.value();
            if (request == null) {
                impressionState.add(impression);
                Event impressionEvent = new Event().readByLine(impression);
                ctx.timerService().registerEventTimeTimer(Long.parseLong(impressionEvent.timestamp) + timeToLiveMinutes * 60 * 1000);
            } else {
                Event event = paddingData(request, value.f1);
                out.collect(new Tuple2<>("joined", event.toString()));
            }
        }

    }

    public Event paddingData(String request, String showClick) {
        Event requestEvent = new Event().readByLine(request);
        Event showClickEvent = new Event().readByLine(showClick);
        return paddingData(requestEvent, showClickEvent);

    }

    public Event paddingData(String request, Event showClickEvent) {
        Event requestEvent = new Event().readByLine(request);
        return paddingData(requestEvent, showClickEvent);

    }

    public Event paddingData(Event requestEvent, String showClick) {
        Event showClickEvent = new Event().readByLine(showClick);
        return paddingData(requestEvent, showClickEvent);

    }

    public Event paddingData(Event requestEvent, Event showClickEvent) {
        int imp = Integer.parseInt(requestEvent.imp) + Integer.parseInt(showClickEvent.imp);
        int click = Integer.parseInt(requestEvent.click) + Integer.parseInt(showClickEvent.click);
        requestEvent.imp = String.valueOf(imp);
        requestEvent.click = String.valueOf(click);
        return requestEvent;

    }

}
